/*******************************************************************************
 * Pentaho Big Data
 * <p>
 * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com
 * <p>
 * ******************************************************************************
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 ******************************************************************************/

package org.pentaho.hadoop.shim;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSelectInfo;
import org.apache.commons.vfs2.FileSelector;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileType;
import org.apache.commons.vfs2.impl.DefaultFileSystemManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.plugins.KettleURLClassLoader;
import org.pentaho.di.core.util.StringUtil;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.hadoop.shim.api.ConfigurationException;
import org.pentaho.hadoop.shim.api.internal.ActiveHadoopConfigurationLocator;
import org.pentaho.hadoop.shim.api.internal.Required;
import org.pentaho.hadoop.shim.api.internal.ShimProperties;
import org.pentaho.hadoop.shim.spi.FormatShim;
import org.pentaho.hadoop.shim.spi.HadoopConfigurationProvider;
import org.pentaho.hadoop.shim.spi.HadoopShim;
import org.pentaho.hadoop.shim.spi.PentahoHadoopShim;
import org.pentaho.hadoop.shim.spi.PigShim;
import org.pentaho.hadoop.shim.spi.SnappyShim;
import org.pentaho.hadoop.shim.spi.SqoopShim;
import org.pentaho.hadoop.shim.spi.HBaseShim;
import org.pentaho.hadoop.shim.api.internal.oozie.OozieClientFactory;

/**
 * A file-based Hadoop configuration provider that knows how to load Hadoop configurations from a VFS file system. This
 * class is not thread-safe.
 */
public class HadoopConfigurationLocator implements HadoopConfigurationProvider {
  private static final String JAR_EXTENSION = ".jar";

  private static final String CONFIG_PROPERTIES_FILE = "config.properties";

  private static final String CONFIG_PROPERTY_IGNORE_CLASSES = "ignore.classes";

  private static final String CONFIG_PROPERTY_EXCLUDE_JARS = "exclude.jars";

  private static final String SHIM_CLASSPATH_IGNORE = "classpath.ignore";

  private static final String CONFIG_PROPERTY_CLASSPATH = "classpath";

  private static final String CONFIG_PROPERTY_LIBRARY_PATH = "library.path";

  private static final String CONFIG_PROPERTY_NAME = "name";

  private static final String PMR_PROPERTIES = "pmr.properties";

  private static final URL[] EMPTY_URL_ARRAY = new URL[ 0 ];

  private static final Class<?> PKG = HadoopConfigurationLocator.class;

  private Logger logger = LogManager.getLogger( getClass() );

  /**
   * This is a set of shim classes to load from each Hadoop configuration. TODO Externalize this list so we may
   * configure it per installation
   */
  private static final Map<Class<? extends PentahoHadoopShim>, String > SHIM_TYPES = new HashMap<Class<? extends PentahoHadoopShim>, String >();
  @SuppressWarnings( "unchecked" )
  /*private static final Class<? extends PentahoHadoopShim>[] SHIM_TYPES = new Class[] {
    HadoopShim.class,
    HBaseShim.class,
    PigShim.class,
    FormatShim.class,
    SnappyShim.class,
    SqoopShim.class,
    OozieClientFactory.class
  };*/

  private static final PentahoHadoopShim[] EMPTY_SHIM_ARRAY = new PentahoHadoopShim[ 0 ];

  /**
   * Currently known shim configurations
   */
  private Map<String, HadoopConfiguration> configurations;

  /**
   * Flag indicating we've been initialized. We require initialization to know where to look for Hadoop configurations
   * on disk.
   */
  private boolean initialized;

  /**
   * Used to determine the active Hadoop configuration at runtime
   */
  private ActiveHadoopConfigurationLocator activeLocator;

  /**
   * The file system manager used to provide shims a way to register their {@link FileProvider} implementations.
   */
  private HadoopConfigurationFileSystemManager fsm;

  private DefaultFileSystemManager defaultFsm;

  /**
   * Initialize this factory with a directory of where to look for cluster configurations.
   *
   * @param baseDir       Directory to look for Hadoop configurations in
   * @param activeLocator A locator for resolving the current active Hadoop configuration
   * @param fsm           A file system manager to inject VFS file providers into from any loaded Hadoop configuration
   */
  public void init( FileObject baseDir,
                    ActiveHadoopConfigurationLocator activeLocator,
                    DefaultFileSystemManager fsm ) throws ConfigurationException {

    SHIM_TYPES.put( HadoopShim.class, "org.pentaho.hadoop.shim.HadoopShim" );
    SHIM_TYPES.put( HBaseShim.class, "org.pentaho.hbase.shim.common.HBaseShimImpl" );
    SHIM_TYPES.put( PigShim.class, "" );
    SHIM_TYPES.put( FormatShim.class, "org.pentaho.hadoop.shim.common.CommonFormatShim" );
    SHIM_TYPES.put( SnappyShim.class, "org.pentaho.hadoop.shim.common.SnappyShimImpl" );
    SHIM_TYPES.put( SqoopShim.class, "" );
    SHIM_TYPES.put( OozieClientFactory.class, "" );

    if ( baseDir == null ) {
      throw new NullPointerException( FileObject.class.getSimpleName()
        + " is required" );
    }
    if ( activeLocator == null ) {
      throw new NullPointerException(
        ActiveHadoopConfigurationLocator.class.getSimpleName()
          + " is required" );
    }
    if ( fsm == null ) {
      throw new NullPointerException(
        DefaultFileSystemManager.class.getSimpleName() + " is required" );
    }
    this.defaultFsm = fsm;
    this.fsm = new HadoopConfigurationFileSystemManager( this, fsm );
    findHadoopConfigurations( baseDir, activeLocator );
    this.activeLocator = activeLocator;
    initialized = true;
  }

  /**
   * Attempt to find any Hadoop configuration as a direct descendant of the provided directory.
   *
   * @param baseDir Directory to look for Hadoop configurations in
   * @throws ConfigurationException
   */
  private void findHadoopConfigurations( FileObject baseDir, ActiveHadoopConfigurationLocator activeLocator )
    throws ConfigurationException {
    configurations = new HashMap<String, HadoopConfiguration>();
    try {
      if ( !baseDir.exists() ) {
        throw new ConfigurationException(
          BaseMessages.getString( PKG, "Error.HadoopConfigurationDirectoryDoesNotExist", baseDir.getURL() ) );
      }
      for ( FileObject f : baseDir.findFiles( new FileSelector() {
        @Override
        public boolean includeFile( FileSelectInfo info ) throws Exception {
          return info.getDepth() == 1
            && FileType.FOLDER.equals( info.getFile().getType() );
        }

        @Override
        public boolean traverseDescendents( FileSelectInfo info )
          throws Exception {
          return info.getDepth() == 0;
        }
      } ) ) {
        // Only load the specified configuration (ID should match the basename, we allow case-insensitivity)
        if ( f.getName().getBaseName().equalsIgnoreCase( activeLocator.getActiveConfigurationId() ) ) {
          HadoopConfiguration config = loadHadoopConfiguration( f );
          if ( config != null ) {
            configurations.put( config.getIdentifier(), config );
          }
        }
      }
    } catch ( FileSystemException ex ) {
      throw new ConfigurationException(
        BaseMessages.getString( PKG, "Error.UnableToLoadConfigurations", baseDir.getName().getFriendlyURI() ), ex );
    }
  }

  /**
   * Exclude jars contained in exclude.jars property in config.properties file from the list of URLs
   *
   * @param urls                 the list of all the URLs to add to the class loader
   * @param excludedJarsProperty exclude.jars property from a config.properties file
   * @return The rest of the jars in {@code urls} after excluding the jars listed in {@code excludedJarsProperty}.
   */

  protected List<URL> filterJars( List<URL> urls, String excludedJarsProperty ) {

    Pattern pattern;
    Matcher matcher;
    String[] excludedJars;

    if ( !( excludedJarsProperty == null || excludedJarsProperty.trim().isEmpty() ) ) {
      excludedJars = excludedJarsProperty.split( "," );
      if ( excludedJars != null ) {
        for ( String excludedJar : excludedJars ) {
          pattern = Pattern.compile( ".*/" + excludedJar.toLowerCase() + "-.*\\.jar$" );
          matcher = pattern.matcher( "" );
          Iterator<URL> iterator = urls.listIterator();
          while ( iterator.hasNext() ) {
            URL url = iterator.next();
            if ( url.toString().toLowerCase().contains( excludedJar.toLowerCase() ) ) {
              if ( excludedJar.endsWith( ".jar" ) || url.toString().toLowerCase()
                .contains( excludedJar.toLowerCase() + ".jar" ) ) {
                iterator.remove();
              } else {
                if ( matcher.reset( url.toString().toLowerCase() ).matches() ) {
                  iterator.remove();
                }
              }
            }
          }
        }
      }
    }
    return urls;
  }

  private List<URL> findJarsIn( FileObject path, final int maxdepth, final Set<String> paths )
    throws FileSystemException {
    FileObject[] jars = path.findFiles( new FileSelector() {
      @Override
      public boolean includeFile( FileSelectInfo info ) throws Exception {
        for ( String path : paths ) {
          if ( info.getFile().getURL().toString().endsWith( path ) ) {
            return false;
          }
        }
        return info.getFile().getName().getBaseName().endsWith( JAR_EXTENSION );
      }

      @Override
      public boolean traverseDescendents( FileSelectInfo info ) throws Exception {
        for ( String path : paths ) {
          if ( info.getFile().getURL().toString().endsWith( path ) ) {
            return false;
          }
        }
        return info.getDepth() <= maxdepth;
      }
    } );

    List<URL> jarUrls = new ArrayList<URL>();
    for ( FileObject jar : jars ) {
      jarUrls.add( jar.getURL() );
    }
    return jarUrls;
  }

  /**
   * Find all jar files in the path provided.
   *
   * @param path     Path to search for jar files within
   * @param maxdepth Maximum traversal depth (1-based)
   * @return All jars found within {@code path} in at most {@code maxdepth} subdirectories.
   * @throws FileSystemException
   */

  private void checkInitialized() {
    if ( !initialized ) {
      throw new RuntimeException( BaseMessages.getString( PKG, "Error.LocatorNotInitialized" ) );
    }
  }

  /**
   * Locates an implementation of {@code service} using the {@link ServiceLoader}.
   *
   * @param cl Class loader to look for implementations in
   * @return The first implementation found.
   */
  protected <T> T locateServiceImpl( ClassLoader cl, Class<T> service ) {
    //Refactored to use a java.util.Map instead of providing the interface implementations via a ServiceLoader
    //If this approach does not work then we should switch bac to using ServiceLoader and expose each implementation
    //in each shim via META-INF/services/org.pentaho.hadoop.shim.spi.HadoopShim
    /*
    ServiceLoader<T> loader = ServiceLoader.load( service, cl );
    Iterator<T> iter = loader.iterator();
    if ( iter.hasNext() ) {
      return iter.next();
    }
    return null;
    */
    try {
      return (T) cl.loadClass( SHIM_TYPES.get( service ) ).getDeclaredConstructor().newInstance();
    } catch ( Exception e ) {
      return null;
    }
  }

  /**
   * Create a ClassLoader to load resources for a {@code HadoopConfiguration}.
   *
   * @param root           Configuration root directory
   * @param parent         Parent class loader to delegate to if resources cannot be found in the configuration's
   *                       directory or provided classpath
   * @param classpathUrls  Additional URLs to add to the class loader. These will be added before any internal
   *                       resources.
   * @param ignoredClasses Classes (or packages) that should not be loaded by the class loader
   * @return A class loader capable of loading a Hadoop configuration located at {@code root}.
   * @throws ConfigurationException Error creating a class loader for the Hadoop configuration located at {@code root}
   */
  protected ClassLoader createConfigurationLoader( FileObject root,
                                                   ClassLoader parent, List<URL> classpathUrls,
                                                   ShimProperties configurationProperties, String... ignoredClasses )
    throws ConfigurationException {
    try {
      if ( root == null || !FileType.FOLDER.equals( root.getType() ) ) {
        throw new IllegalArgumentException( "root must be a folder: " + root );
      }

      // Find all jar files in the configuration, at most 2 folders deep
      List<URL> jars = findJarsIn( root, 0, configurationProperties.getConfigSet( SHIM_CLASSPATH_IGNORE ) );

      // Add the root of the configuration
      jars.add( 0, new URL( root.getURL().toExternalForm() + "/" ) );
      // Inject any overriding URLs before all other paths
      if ( classpathUrls != null ) {
        jars.addAll( 0, classpathUrls );
      }
      //Exclude jars contained in exclude.jars property in config.properties file from the list of jars
      jars = filterJars( jars, configurationProperties.getProperty( CONFIG_PROPERTY_EXCLUDE_JARS ) );

      ClassLoader cl = new HadoopConfigurationClassLoader( jars.toArray( EMPTY_URL_ARRAY ),
        parent, ignoredClasses );
      return cl;
    } catch ( Exception ex ) {
      throw new ConfigurationException( BaseMessages.getString( PKG, "Error.CreatingClassLoader" ), ex );
    }
  }

  private Properties getPmrProperties() {
    InputStream pmrProperties = getClass().getClassLoader().getResourceAsStream(
      PMR_PROPERTIES );
    Properties properties = new Properties();
    if ( pmrProperties != null ) {
      try {
        properties.load( pmrProperties );
      } catch ( IOException ioe ) {
        // pmr.properties not available
      } finally {
        if ( pmrProperties != null ) {
          try {
            pmrProperties.close();
          } catch ( IOException e ) {
            // pmr.properties not available
          }
        }
      }
    }
    return properties;
  }

  @VisibleForTesting
  boolean isRunningOnCluster() {
    Properties pmrProperties = getPmrProperties();
    String isPmr = pmrProperties.getProperty( "isPmr", "false" );
    return ( "true".equals( isPmr ) );
  }

  /**
   * Parse a set of URLs from a comma-separated list of URLs. If the URL points to a directory all jar files within that
   * directory will be returned as well.
   *
   * @param urlString Comma-separated list of URLs (relative or absolute)
   * @return List of URLs resolved from {@code urlString}
   */
  protected List<URL> parseURLs( FileObject root, String urlString ) {
    if ( urlString == null || urlString.trim().isEmpty() ) {
      return Collections.emptyList();
    }
    String[] paths = urlString.split( "," );
    List<URL> urls = new ArrayList<URL>();
    for ( String path : paths ) {
      try {
        FileObject file = root.resolveFile( path.trim() );
        if ( !file.exists() ) {
          file = defaultFsm.resolveFile( path.trim() );
        }
        if ( FileType.FOLDER.equals( file.getType() ) ) {
          // Add directories with a trailing / so the URL ClassLoader interprets
          // them as directories
          urls.add( new URL( file.getURL().toExternalForm() + "/" ) );
          // Also add all jars within this directory
          urls.addAll( findJarsIn( file, 1, new HashSet<String>() ) );
        } else {
          urls.add( file.getURL() );
        }
      } catch ( Exception e ) {
        // Log invalid path
        logger.error( BaseMessages.getString( PKG, "Error.InvalidClasspathEntry", path ) );
      }
    }
    return urls;
  }

  /**
   * Attempt to discover a valid Hadoop configuration from the provided folder.
   *
   * @param folder Folder that may represent a Hadoop configuration
   * @return A Hadoop configuration for the folder provided or null if none is found.
   * @throws ConfigurationException Error when loading the Hadoop configuration.
   */
  protected HadoopConfiguration loadHadoopConfiguration( FileObject folder ) throws ConfigurationException {
    ShimProperties configurationProperties = new ShimProperties();

    try {
      FileObject configFile = folder.getChild( CONFIG_PROPERTIES_FILE );
      if ( configFile != null ) {
        configurationProperties.putAll( loadProperties( configFile ) );
      }
    } catch ( Exception ex ) {
      throw new ConfigurationException(
        BaseMessages.getString( PKG, "Error.UnableToLoadConfigurationProperties", CONFIG_PROPERTIES_FILE ) );
    }

    for ( Entry<String, String> entry : configurationProperties.getPrefixedProperties( "java.system" ).entrySet() ) {
      System.setProperty( entry.getKey(), entry.getValue() );
    }

    try {
      List<URL> classpathElements = null;
      if ( !isRunningOnCluster() ) {
        // Parse all URLs from an optional classpath from the configuration file
        classpathElements = parseURLs( folder,
          configurationProperties.getProperty( CONFIG_PROPERTY_CLASSPATH ) );
      }

      // Allow external configuration of classes to ignore
      String ignoredClassesProperty = configurationProperties
        .getProperty( CONFIG_PROPERTY_IGNORE_CLASSES );
      String[] ignoredClasses = null;
      if ( !StringUtil.isEmpty( ignoredClassesProperty ) ) {
        ignoredClasses = ignoredClassesProperty.split( "," );
      }

      // Pass our class loader in to the configurations' CL as its parent so it
      // can find the same
      // API classes we're using
      ClassLoader cl = createConfigurationLoader( folder, getClass()
        .getClassLoader(), classpathElements, configurationProperties, ignoredClasses );
      verifyClasses(
        cl, configurationProperties.getProperty( "required.classes" ), configurationProperties.getProperty( "name" ) );

      // Treat the Hadoop shim special. It is absolutely required for a Hadoop configuration.
      HadoopShim hadoopShim = null;
      List<PentahoHadoopShim> shims = new ArrayList<PentahoHadoopShim>();
      // Attempt to locate a shim within this folder
      //for ( Class<? extends PentahoHadoopShim> shimType : SHIM_TYPES ) {
      for ( Class<? extends PentahoHadoopShim> shimType : SHIM_TYPES.keySet() ) {
        PentahoHadoopShim s = locateServiceImpl( cl, shimType );
        if ( s == null && shimType.getAnnotation( Required.class ) != null ) {
          logger.warn( BaseMessages.getString( PKG, "Error.MissingRequiredShim", shimType.getSimpleName() ) );
          // Do not continue to load the configuration if we are missing a required shim
          return null;
        }
        if ( HadoopShim.class.isAssignableFrom( shimType ) ) {
          hadoopShim = (HadoopShim) s;
        } else {
          shims.add( s );
        }
      }
      String id = folder.getName().getBaseName();
      String name = configurationProperties.getProperty( CONFIG_PROPERTY_NAME,
        id );

      HadoopConfiguration config = new HadoopConfiguration( configurationProperties, folder, id, name, hadoopShim,
        shims.toArray( EMPTY_SHIM_ARRAY ) );

      // Register native libraries after everything else has been loaded successfully
      registerNativeLibraryPaths( configurationProperties.getProperty( CONFIG_PROPERTY_LIBRARY_PATH ) );

      hadoopShim.onLoad( config, fsm );
      return config;
    } catch ( Throwable t ) {
      throw new ConfigurationException(
        BaseMessages.getString( PKG, "Error.LoadingConfiguration" ) + " " + t.toString(), t );
    }
  }

  protected void verifyClasses( ClassLoader classLoader, String requiredClasses, String shimName )
    throws ConfigurationException {
    if ( !Const.isEmpty( requiredClasses ) ) {
      for ( String className : requiredClasses.split( "," ) ) {
        try {
          classLoader.loadClass( className );
        } catch ( Throwable e ) {
          throw new ConfigurationException(
            BaseMessages.getString( PKG, "Error.MissingRequiredClasses", className, shimName ) );
        }
      }
    }
  }

  /**
   * Register a comma-separated list of native library paths.
   *
   * @param paths Comma-separated list of libraries
   */
  protected void registerNativeLibraryPaths( String paths ) {
    if ( paths == null ) {
      return;
    }
    for ( String path : paths.split( "," ) ) {
      boolean successful = registerNativeLibraryPath( path );
      if ( !successful ) {
        logger.error( BaseMessages.getString( PKG, "Error.RegisteringLibraryPath", path ) );
      }
    }
  }

  /**
   * Dynamically register a native library path. This relies on a specific implementation detail of ClassLoader: it's
   * usr_paths property.
   *
   * @param path Library path to add
   * @return {@code true} if the library path could be added successfully
   */
  protected boolean registerNativeLibraryPath( String path ) {
    if ( path == null ) {
      throw new NullPointerException();
    }
    path = path.trim();
    try {
      Field f = ClassLoader.class.getDeclaredField( "usr_paths" );
      boolean accessible = f.isAccessible();
      f.setAccessible( true );
      try {
        String[] paths = (String[]) f.get( null );

        // Make sure the path isn't already registered
        for ( String p : paths ) {
          if ( p.equals( path ) ) {
            return true; // Success, it's already there!
          }
        }

        String[] newPaths = new String[ paths.length + 1 ];
        System.arraycopy( paths, 0, newPaths, 0, paths.length );
        newPaths[ paths.length ] = path;
        f.set( null, newPaths );
        // Success!
        return true;
      } finally {
        f.setAccessible( accessible );
      }
    } catch ( Exception ex ) {
      // Something went wrong, definitely not successful
      return false;
    }
  }

  /**
   * Load the properties file located at {@code file}
   *
   * @param file Location of a properties file to load
   * @return Loaded properties file
   * @throws IOException         Error loading properties from file
   * @throws FileSystemException Error locating input stream for file
   */
  protected Properties loadProperties( FileObject file )
    throws FileSystemException, IOException {
    Properties p = new Properties();
    p.load( file.getContent().getInputStream() );
    return p;
  }

  @Override
  public List<HadoopConfiguration> getConfigurations() {
    checkInitialized();
    return new ArrayList<HadoopConfiguration>( configurations.values() );
  }

  @Override
  public boolean hasConfiguration( String id ) {
    checkInitialized();
    return configurations.containsKey( id );
  }

  @Override
  public HadoopConfiguration getConfiguration( String id )
    throws ConfigurationException {
    checkInitialized();
    HadoopConfiguration config = configurations.get( id );
    if ( config == null ) {
      throw new ConfigurationException( BaseMessages.getString( PKG, "Error.UnknownHadoopConfiguration", id ) );
    }
    return config;
  }

  @Override
  public HadoopConfiguration getActiveConfiguration()
    throws ConfigurationException {
    return getConfiguration( activeLocator.getActiveConfigurationId() );
  }
}